1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static rx.Observable.concat;
19 import static rx.Observable.just;
20 import static rx.Observable.zip;
21 import rx.Observable;
22 import rx.functions.Func1;
23 import rx.functions.Func2;
24 import rx.internal.util.UtilityFunctions;
25
26
27
28
29
30 public final class OperatorSequenceEqual {
31 private OperatorSequenceEqual() {
32 throw new IllegalStateException("No instances!");
33 }
34
35
36 private static final Object LOCAL_ONCOMPLETED = new Object();
37 static <T> Observable<Object> materializeLite(Observable<T> source) {
38 return concat(
39 source.map(new Func1<T, Object>() {
40
41 @Override
42 public Object call(T t1) {
43 return t1;
44 }
45
46 }), just(LOCAL_ONCOMPLETED));
47 }
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 public static <T> Observable<Boolean> sequenceEqual(
64 Observable<? extends T> first, Observable<? extends T> second,
65 final Func2<? super T, ? super T, Boolean> equality) {
66 Observable<Object> firstObservable = materializeLite(first);
67 Observable<Object> secondObservable = materializeLite(second);
68
69 return zip(firstObservable, secondObservable,
70 new Func2<Object, Object, Boolean>() {
71
72 @Override
73 @SuppressWarnings("unchecked")
74 public Boolean call(Object t1, Object t2) {
75 boolean c1 = t1 == LOCAL_ONCOMPLETED;
76 boolean c2 = t2 == LOCAL_ONCOMPLETED;
77 if (c1 && c2) {
78 return true;
79 }
80 if (c1 || c2) {
81 return false;
82 }
83
84 return equality.call((T)t1, (T)t2);
85 }
86
87 }).all(UtilityFunctions.<Boolean> identity());
88 }
89 }